iT邦幫忙

2023 iThome 鐵人賽

DAY 17
0

持續精煉我們的核心 API

讓我們開始實作其他核心 API 吧!以下為完整程式:

import java.util.concurrent.*

object Par {
  opaque type Par[A] = ExecutorService => Future[A]
	
  private case class UnitFuture[A](get: A) extends Future[A]:
    def isDone = true
    def get(timeout: Long, units: TimeUnit) = get
    def isCancelled = false
    def cancel(evenIfRunning: Boolean): Boolean = false

  def unit[A](a: A): Par[A] =
    _ => UnitFuture(a)
  
  extension[A] (pa: Par[A])
    def run(s: ExecutorService): Future[A] = pa(s)

    def map2[B, C](pb: Par[B])(f: (A, B) => C): Par[C] =
      (es: ExecutorService) =>
        val af = pa(es)
        val bf = pb(es)
        UnitFuture(f(af.get, bf.get))


  def fork[A](a: => Par[A]): Par[A] =
    (es: ExecutorService) =>
      es.submit(new Callable[A] {
        def call = a(es).get
      })

  def lazyUnit[A](a: => A): Par[A] = fork(unit(a))
}

首先 unit 是一個把一般的值轉為 Future,該 Future 不需要使用 ExecutorService 來運行,代表也不需要支持 cancel 功能,Future 的 get 也是直接回傳 unit 的入參,

也因為 Future 並不是純粹的 functional 介面,且它有依賴 side effect 做事,基於此,我們將 Future 類別擴充成新類別 UnitFuture, 讓 unit 的實作就是直接 new UnitFuture 即可;

在來我們用了 extension 來擴展 Par,把 run 和 map2 都放在下面,讓我們能直接以 Par 來呼叫 run 和 map2;

extension 的說明可看 Day 14

map2 也是在取得 Par[A] 和 Par[B] 的結果回傳 UnitFuture;

fork 我們先用一個最簡單和自然的方式來實作。


Exercise D17-1

使用 lazyUnit 實作 asyncF function,asyncF 能使用入參 f: A => B 把回傳值 high-order function 中的 A 轉成 Par[B]。

def asyncF[A, B](f: A => B): A => Par[B]

接下來我們看如何透過核心 API 來做各種組合操作,假設我們有 Par[List[Int]],然後我們想針對 Par 中的 List 做排序,

def sortPar(parList: Par[List[Int]]): Par[List[Int]]

首先我們當然可以 run Par,取得 List 結果後在排序,最後在用 unit 包裹起來,但我們應當避免調用 run,目前核心 API 中能 Par 值的 function 是 map2,所以我們應該用它來實現,

def sortPar(parList: Par[List[Int]]): Par[List[Int]] =
  parList.map2(unit(()))((a, _) => a.sorted)

這裡可以看到 map2 的另一個參數不重要,所以我們用 unit(()) 來產生個空 Par 給 map2,既然我們只在意一個參數,我們可以抽象這個概念提升成 map function(我將此 map function 放到 extension 下),

def map[B](f: A => B): Par[B] =
  map2(unit(()))((a, _) => f(a))

現在我們的 sortPar 可以這樣實現。

def sortPar(parList: Par[List[Int]]): Par[List[Int]] =
  parList.map(_.sorted)

更多的組合

我們可以使用 f: A => B 來平行化運行的轉換 list 中所有元素嗎?不像 map2 是合併 2 個 Par 成一個 Par,這裡需要合併 N 個 Par 成一個 Par,首先我們可以用剛剛的 asyncF 把 List[A] 轉成新的 List[Par[B]],但下一步呢?

def parMap[A, B](ps: List[A])(f: A => B): Par[List[B]] = 
	val fbs: List[Par[B]] = ps.map(asyncF(f))
  ???

看起來我們需要個 function 把 List[Par[B]] 轉成 Par[List[B]]

def sequence[A](pas: List[Par[A]]): Par[List[A]] =
  pas.foldRight(unit(List.empty[A]))((pa, acc) => pa.map2(acc)(_ :: _))

sequence 使用了 foldRight 來和 map2 來合併 List 中的所有值,

foldRight 的說明可以看 Day 4

現在我們可以完善 parMap 了,這裡我們多調用了 fork 來讓它使用新的邏輯執行緒運行,即使我們的入參是很大的 List,parMap 會立即回傳 Par[List[B]] 回來。

def parMap[A, B](ps: List[A])(f: A => B): Par[List[B]] = fork:
  val fbs: List[Par[B]] = ps.map(asyncF(f))
  sequence(fbs)

Exercise D17-2

(Hard) 實作 parFilter,它能平行化的過濾 List 中所有元素。

def parFilter[A](l: List[A])(f: A => Boolean): Par[List[A]]

Purely Function 的平行化 還沒完喔,明天繼續!


Day 17 - Exercise answer


上一篇
Purely Function 的平行化 (2)
下一篇
Purely Function 的平行化 (4)
系列文
用 Scala 3 寫的 Functional Programming 會長什麼樣子?30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言